00 产品概览 Product Overview
AeroMQ 是一个面向高并发、低延迟场景的高性能消息队列系统,采用 Netty 异步 I/O、无锁 SPSC 环形队列和 off‑heap 内存池化,提供稳定且可观测的消息通道。
AeroMQ is a high‑performance message queue built for high concurrency and low latency, leveraging Netty async I/O, lock‑free SPSC ring buffers, and off‑heap pooled memory for stable and observable messaging.
01 核心特性 Key Features
RequestId → CompletableFuture
Lock‑free, hybrid wait
Pooled DirectBuffer
Async I/O
CSV + Charts
Stats & Logs
02 架构与目录 Architecture & Layout
AeroMQ/
├─ aeromq-protocol # 协议与命令 / Protocol & Commands
├─ aeromq-core # Broker 核心 / Broker Core
├─ aeromq-client # 客户端 SDK / Client SDK
├─ aeromq-benchmark # 基准测试 / Benchmark Suite
└─ docs # 静态文档 / Static Docs
- 协议采用长度前缀帧;命令集中管理(Commands)
- Length‑prefixed frames; centralized Commands
- 核心引擎整合 SPSC 与 off‑heap,最小 GC 暴露
- Core integrates SPSC and off‑heap for minimal GC
03 模块总览 Modules Overview
aeromq-protocol
定义协议与命令集
Defines protocol & commands
aeromq-core
Broker 核心,SPSC + Off‑heap
Broker core, SPSC + Off‑heap
aeromq-client
客户端 SDK,请求映射
Client SDK, request mapping
aeromq-benchmark
性能测试与可视化
Benchmarks & charts
docs
静态文档与示例
Static docs & samples
03A 典型应用场景 Typical Use Cases
实时监控/告警
Real‑time Monitoring/Alerting
Low latency, high fan‑out
交易与结算事件
Transactional Events
At‑least‑once semantics
IoT / 流数据
IoT / Streaming
Backpressure aware
微服务解耦
Microservice Decoupling
Async pipelines
03B 协议要点 Protocol Details
- 长度前缀二进制帧;Header 携带
requestId
、命令、长度 - Length‑prefixed binary frames; Header carries
requestId
, command, length
// Frame (pseudo)
struct Header {
int32 length;
int16 command; // e.g. SEND
int64 requestId;
}
// Payload: message body
04 快速开始 Quick Start
Windows
install-maven.bat // 可选:自动安装 Maven
quick-start-fixed.bat // 修复中文编码,推荐
Linux / macOS
chmod +x quick-start.sh
./quick-start.sh
若出现中文乱码,先执行 chcp 65001
再运行脚本。
If mojibake appears, run chcp 65001
before scripts.
01 ThreadLocal → RequestId → CompletableFuture(支持高并发多请求) ThreadLocal → RequestId → CompletableFuture (High‑Concurrency)
目标与动机
Goals & Motivation
摆脱 ThreadLocal 单线程限制,使用连接级 requestId 映射 精确回送响应。
Remove ThreadLocal single‑thread limits by using per‑connection requestId mapping for precise response routing.
关键部件与数据结构
Key Components & Data Structures
AtomicLong requestIdGen
ConcurrentHashMap<Long, CompletableFuture<Response>> pendingRequests
- 超时调度器:
ScheduledExecutor
或DelayQueue
- Timeout handling via
ScheduledExecutor
orDelayQueue
- 消息协议携带
requestId
(帧 header / protobuf 字段) - Protocol carries
requestId
(frame header / protobuf field)
客户端发送流程
Client Send Flow
long reqId = requestIdGen.incrementAndGet();
CompletableFuture<Response> f = new CompletableFuture<>();
pendingRequests.put(reqId, f);
// ... serialize (with requestId) & write to channel
scheduler.schedule(() -> {
var g = pendingRequests.remove(reqId);
if (g != null) g.completeExceptionally(new TimeoutException());
}, timeout, MILLISECONDS);
return f;
客户端接收响应
Client Receive Flow
// decode to requestId + payload
var f = pendingRequests.remove(requestId);
if (f != null) f.complete(response);
else log.warn("late/unknown response");
边界条件与注意
Edge Cases & Notes
- 限制 pending 数量,超阈值拒绝或 429
- Limit pending count; reject or return 429 on overflow
- 64‑bit requestId 回绕一般可忽略;循环需确保旧请求已清理
- 64‑bit wrap‑around is negligible; ensure old entries are purged
- 断连时遍历 pending 并完成异常,避免资源泄露
- On disconnect, fail all pending to release resources
02 SPSC Ring Buffer(无锁)与条件变量唤醒 SPSC Ring Buffer (Lock‑free) with Condition‑based Wakeup
选择 SPSC 的场景
When to Choose SPSC
按 key → shard 路由;每 shard 单消费者;多生产者通过连接缓冲聚合到单 SPSC。
Key→shard routing; single consumer per shard; multi‑producer via per‑connection buffer aggregated to one SPSC.
实现要点
Implementation Notes
- 容量 2 的幂,
index = seq & mask
- head/tail 分离更新,避免写竞争
- false sharing 避免(padding /
@Contended
) - 内存序:写发布(release),读获取(acquire)
// 判满/判空
available = tail - head
if (available == capacity) full
if (available == 0) empty
// 生产后通知(从空→非空才通知)
if (wasEmpty) condition.signal();
// 消费者等待(混合策略)
spin a few cycles → condition.await(timeout)
03 Off‑heap DirectByteBuffer:池化减少 GC Off‑heap DirectByteBuffer: Pooled for Lower GC
设计思路(内存池 + slab)
Design (Pool + Slab)
- 预分配 direct memory,按 256B/1KB/4KB/16KB 分级
- 返回
BufferRef
(id/offset/len),ring 保存引用而非 payload - 处理后归还池;异常路径务必释放
- 推荐 Netty
PooledByteBufAllocator
// 零拷贝发送(示意)
ByteBuf buf = pooledAllocator.directBuffer(len);
buf.writeBytes(offHeapRef);
channel.writeAndFlush(buf); // avoid heap copy
统计信息
Memory Stats
// Off-heap MemoryStats (example output)
active=128, allocated=2048, released=1920,
memory=64,000,000 bytes, pools=[small=240, medium=120, large=60]
边界条件与注意
Edge Cases & Notes
- 内存碎片监控,必要时调整 slab 配置
- Monitor for fragmentation; adjust slab config if needed
- DirectByteBuffer 直接分配的内存不受管控
- DirectByteBuffer allocated memory is unmanaged
05A 可观测性与指标 Observability & Metrics
inflight.count, timeout.count
ring.depth, drain.rate
direct.used, pool.{small,medium,large}
bytes.in/out, conn.count
05 基准测试与可视化 Benchmarks & Visualization
使用 BenchmarkRunner
产出 CSV,配合 scripts/visualize_benchmark.py
生成图表与 HTML 报告。
Run BenchmarkRunner
to produce CSV; visualize with scripts/visualize_benchmark.py
.
06A 安全与可靠性 Security & Reliability
- 连接鉴权、命令级授权(可选)
- Connection auth, command‑level authorization (optional)
- 超时与断连回收 pending,避免资源泄露
- Timeout/disconnect cleanup of pending to prevent leaks
- 背压与队列容量限制,防止内存爆炸
- Backpressure and queue capacity limits to prevent overload
06 故障排除 Troubleshooting
- mvn 未找到:运行
install-maven.bat
或手动配置 PATH - 中文乱码:使用
quick-start-fixed.bat
或先执行chcp 65001
- 端口占用:检查/释放 8888 端口(Windows: taskkill /PID <PID> /F)
06B 常见问题 FAQ
为何不用 MPSC 替代 SPSC?
Why not MPSC instead of SPSC?
requestId 会不会溢出?
Will requestId overflow?
07 路线图 Roadmap
- 持久化后端(RocksDB / PostgreSQL)
- Persistent backends (RocksDB / PostgreSQL)
- 多节点集群与管理控制台
- Multi‑node clustering & admin console
- 指标与监控集成
- Metrics & monitoring integration
04 参考代码片段(伪代码) Reference Snippets (Pseudo‑code)
// RequestId → CompletableFuture
long reqId = requestIdGen.incrementAndGet();
CompletableFuture<Response> f = new CompletableFuture<>();
pendingRequests.put(reqId, f);
// ... write to channel (with requestId)
scheduler.schedule(() -> {
var g = pendingRequests.remove(reqId);
if (g != null) g.completeExceptionally(new TimeoutException());
}, timeout, MILLISECONDS);
// SPSC Ring Buffer (核心思路)
final int capacity = 1 << 16; // power-of-two
final int mask = capacity - 1;
volatile long head = 0, tail = 0;
Object[] slots = new Object[capacity];
boolean offer(Object x) {
long t = tail; long h = head;
if (t - h == capacity) return false; // full
slots[(int)(t & mask)] = x; // release
tail = t + 1; // ordered
return true;
}
Object poll() {
long h = head; long t = tail;
if (t - h == 0) return null; // empty
Object x = slots[(int)(h & mask)]; // acquire
slots[(int)(h & mask)] = null;
head = h + 1;
return x;
}